package com.lianda.watermark;

import com.lianda.state.BroadcastStateMain;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.util.Collector;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

import javax.annotation.Nullable;
import java.net.URL;
import java.util.Properties;


/**
 * 测试水位生成效果
 * 参考：https://blog.csdn.net/wangpei1949/article/details/102883709
 */
@Slf4j
public class DataStreamEventTimeWatermarkMain {
    public static void main(String[] args) throws Exception {
        //1、解析命令行参数
        URL fileUrl = BroadcastStateMain.class.getClassLoader().getResource("watermark.properties");
        ParameterTool parameterTool = ParameterTool
                .fromPropertiesFile(fileUrl.getPath());


        String bootstrapServers = parameterTool.getRequired("bootstrapServers");
        String topic = parameterTool.getRequired("topic");
        String groupID = parameterTool.getRequired("groupID");
        int maxOutOfOrderness = parameterTool.getInt("maxOutOfOrderness"); // 10秒
        int windowLength = parameterTool.getInt("windowLength"); // 30秒
        int allowedLateness = parameterTool.getInt("allowedLateness"); // 5秒

        //2、设置运行环境
        Configuration config = new Configuration();
        config.setInteger(ConfigOptions.key("rest.port").defaultValue(8081),8081);
        config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1); //并行度为1

        //3、添加数据源
        Properties kafkaProperties = new Properties();
        kafkaProperties.put("bootstrap.servers",bootstrapServers);
        kafkaProperties.put("group.id",groupID);
        FlinkKafkaConsumer011<String> kafkaConsumer = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), kafkaProperties);
        kafkaConsumer.setStartFromLatest();
        DataStream<String> source = env.addSource(kafkaConsumer).rebalance();

        //4、解析数据源
        SingleOutputStreamOperator<Tuple2<String, String>> parsedData = source.process(new CustomProcessFunctionParseLog());

        //5、提取时间并生成水印
        SingleOutputStreamOperator<Tuple2<String, String>> watermarkedData = parsedData
                .assignTimestampsAndWatermarks(
                        new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(maxOutOfOrderness)));

        //6、窗口计算
        watermarkedData.keyBy((KeySelector<Tuple2<String, String>, String>) value -> value.f1)
                .window(TumblingEventTimeWindows.of(Time.seconds(windowLength)))
                .allowedLateness(Time.seconds(allowedLateness))
                .process(new CustomProcessWindowFunction());

        //7、开始执行
        env.execute(DataStreamEventTimeWatermarkMain.class.getSimpleName());
    }

    /**
     * 自定义WindowFunction，实现窗口计算
     */
    public static class CustomProcessWindowFunction extends ProcessWindowFunction<Tuple2<String, String>, String, String, TimeWindow> {

        int subTaskID;

        @Override
        public void open(Configuration parameters) throws Exception {
            subTaskID = getRuntimeContext().getIndexOfThisSubtask();
        }

        @Override
        public void process(String key, Context context, Iterable<Tuple2<String, String>> elements,
                            Collector<String> out) throws Exception {
            int count = Iterables.size(elements);

            TimeWindow window = context.window();
            String windowStart = new DateTime(window.getStart(), DateTimeZone.forID("+08:00"))
                    .toString("yyyy-MM-dd HH:mm:ss");
            String windowEnd = new DateTime(window.getEnd(), DateTimeZone.forID("+08:00"))
                    .toString("yyyy-MM-dd HH:mm:ss");

            String record ="SubtaskID: "+subTaskID+ " WindowRange: " + windowStart
                    + " ~ " + windowEnd + " Key: " + key + " Count: " + count;

            log.warn(record);

            out.collect(record);
        }
    }

    /**
     * 自定义ProcessFunction，解析从Kafka获取到的JSON数据
     *
     */
    public static class CustomProcessFunctionParseLog extends ProcessFunction<String,Tuple2<String, String>> {
        @Override
        public void processElement(String value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception {
            try {
                log.info("收到Kafka数据,Record: {}", value);
                String[] values = value.split(",");
                String eventTime=values[0];
                String eventType=values[1];

                Tuple2<String, String> parsedValue = new Tuple2<>(eventTime, eventType);
                out.collect(parsedValue);
            } catch (Exception ex) {
                log.error("解析Kafka数据异常,Record: {}", value, ex);
            }
        }
    }

    /**
     * 有固定延迟的周期性水印。
     * 为方便调试，基于原类做了简单修改。
     * 水印发射的间隔(周期)可通过 StreamExecutionEnvironment.getConfig().setAutoWatermarkInterval(milliseconds)设置。
     * 当为EventTime时，周期默认为200ms。
     */
    public static class BoundedOutOfOrdernessTimestampExtractor
            implements AssignerWithPeriodicWatermarks<Tuple2<String, String>> {
        private static final long serialVersionUID = 1L;

        private long currentMaxTimestamp;

        private long lastEmittedWatermark = Long.MIN_VALUE;

        private final long maxOutOfOrderness;

        public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
            if (maxOutOfOrderness.toMilliseconds() < 0) {
                throw new RuntimeException("Tried to set the maximum allowed " +
                        "lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
            }

            this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
            this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
        }

        @Nullable
        @Override
        public Watermark getCurrentWatermark() {
            long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
            if (potentialWM >= lastEmittedWatermark) {
                lastEmittedWatermark = potentialWM;
            }
            Watermark watermark = new Watermark(lastEmittedWatermark);
            log.warn("当前水印: "+new DateTime(watermark.getTimestamp(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss"));
            return watermark;
        }

        @Override
        public long extractTimestamp(Tuple2<String, String> element, long previousElementTimestamp) {
            DateTimeFormatter dateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
            DateTime dateTime = DateTime.parse(element.f0, dateTimeFormatter);
            long timestamp = dateTime.getMillis();
            if (timestamp > currentMaxTimestamp) {
                currentMaxTimestamp = timestamp;
            }
            return timestamp;
        }
    }
}
